/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.IOException;

import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
import java.util.regex.Pattern;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.CompressionCodec;

import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level;

/**
 * A map/reduce job configuration.
 * 
 * <p>
 * <code>JobConf</code> is the primary interface for a user to describe a
 * map-reduce job to the Hadoop framework for execution. The framework tries to
 * faithfully execute the job as-is described by <code>JobConf</code>, however:
 * <ol>
 * <li>
 * Some configuration parameters might have been marked as <a href="{@docRoot}
 * /org/apache/hadoop/conf/Configuration.html#FinalParams"> final</a> by
 * administrators and hence cannot be altered.</li>
 * <li>
 * While some job parameters are straight-forward to set (e.g.
 * {@link #setNumReduceTasks(int)}), some parameters interact subtly rest of the
 * framework and/or job-configuration and is relatively more complex for the
 * user to control finely (e.g. {@link #setNumMapTasks(int)}).</li>
 * </ol>
 * </p>
 * 
 * <p>
 * <code>JobConf</code> typically specifies the {@link Mapper}, combiner (if
 * any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
 * {@link OutputFormat} implementations to be used etc.
 * 
 * <p>
 * Optionally <code>JobConf</code> is used to specify other advanced facets of
 * the job such as <code>Comparator</code>s to be used, files to be put in the
 * {@link DistributedCache}, whether or not intermediate and/or job outputs are
 * to be compressed (and how), debugability via user-provided scripts (
 * {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
 * for doing post-processing on task logs, task's stdout, stderr, syslog. and
 * etc.
 * </p>
 * 
 * <p>
 * Here is an example on how to configure a job via <code>JobConf</code>:
 * </p>
 * <p>
 * <blockquote>
 * 
 * <pre>
 * // Create a new JobConf
 * JobConf job = new JobConf(new Configuration(), MyJob.class);
 * 
 * // Specify various job-specific parameters
 * job.setJobName(&quot;myjob&quot;);
 * 
 * FileInputFormat.setInputPaths(job, new Path(&quot;in&quot;));
 * FileOutputFormat.setOutputPath(job, new Path(&quot;out&quot;));
 * 
 * job.setMapperClass(MyJob.MyMapper.class);
 * job.setCombinerClass(MyJob.MyReducer.class);
 * job.setReducerClass(MyJob.MyReducer.class);
 * 
 * job.setInputFormat(SequenceFileInputFormat.class);
 * job.setOutputFormat(SequenceFileOutputFormat.class);
 * </pre>
 * 
 * </blockquote>
 * </p>
 * 
 * @see JobClient
 * @see ClusterStatus
 * @see Tool
 * @see DistributedCache
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
@SuppressWarnings({ "deprecation", "rawtypes" })
public class JobConf extends Configuration {

	private static final Log LOG = LogFactory.getLog(JobConf.class);

	static {
		ConfigUtil.loadResources();
	}

	/**
	 * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and
	 *             {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY}
	 */
	@Deprecated
	public static final String MAPRED_TASK_MAXVMEM_PROPERTY = "mapred.task.maxvmem";

	/**
	 * @deprecated
	 */
	@Deprecated
	public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = "mapred.task.limit.maxvmem";

	/**
	 * @deprecated
	 */
	@Deprecated
	public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = "mapred.task.default.maxvmem";

	/**
	 * @deprecated
	 */
	@Deprecated
	public static final String MAPRED_TASK_MAXPMEM_PROPERTY = "mapred.task.maxpmem";

	/**
	 * A value which if set for memory related configuration options, indicates
	 * that the options are turned off.
	 */
	public static final long DISABLED_MEMORY_LIMIT = -1L;

	/**
	 * Property name for the configuration property mapreduce.cluster.local.dir
	 */
	public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;

	/**
	 * Name of the queue to which jobs will be submitted, if no queue name is
	 * mentioned.
	 */
	public static final String DEFAULT_QUEUE_NAME = "default";

	static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = JobContext.MAP_MEMORY_MB;

	static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = JobContext.REDUCE_MEMORY_MB;

	/** Pattern for the default unpacking behavior for job jars */
	public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = Pattern
			.compile("(?:classes/|lib/).*");

	/**
	 * Configuration key to set the java command line options for the child map
	 * and reduce tasks.
	 * 
	 * Java opts for the task tracker child processes. The following symbol, if
	 * present, will be interpolated: @taskid@. It is replaced by current
	 * TaskID. Any other occurrences of '@' will go unchanged. For example, to
	 * enable verbose gc logging to a file named for the taskid in /tmp and to
	 * set the heap maximum to be a gigabyte, pass a 'value' of: -Xmx1024m
	 * -verbose:gc -Xloggc:/tmp/@taskid@.gc
	 * 
	 * The configuration variable {@link #MAPRED_TASK_ULIMIT} can be used to
	 * control the maximum virtual memory of the child processes.
	 * 
	 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass
	 * other environment variables to the child processes.
	 * 
	 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or
	 *             {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
	 */
	@Deprecated
	public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";

	/**
	 * Configuration key to set the java command line options for the map tasks.
	 * 
	 * Java opts for the task tracker child map processes. The following symbol,
	 * if present, will be interpolated: @taskid@. It is replaced by current
	 * TaskID. Any other occurrences of '@' will go unchanged. For example, to
	 * enable verbose gc logging to a file named for the taskid in /tmp and to
	 * set the heap maximum to be a gigabyte, pass a 'value' of: -Xmx1024m
	 * -verbose:gc -Xloggc:/tmp/@taskid@.gc
	 * 
	 * The configuration variable {@link #MAPRED_MAP_TASK_ULIMIT} can be used to
	 * control the maximum virtual memory of the map processes.
	 * 
	 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to
	 * pass other environment variables to the map processes.
	 */
	public static final String MAPRED_MAP_TASK_JAVA_OPTS = JobContext.MAP_JAVA_OPTS;

	/**
	 * Configuration key to set the java command line options for the reduce
	 * tasks.
	 * 
	 * Java opts for the task tracker child reduce processes. The following
	 * symbol, if present, will be interpolated: @taskid@. It is replaced by
	 * current TaskID. Any other occurrences of '@' will go unchanged. For
	 * example, to enable verbose gc logging to a file named for the taskid in
	 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
	 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
	 * 
	 * The configuration variable {@link #MAPRED_REDUCE_TASK_ULIMIT} can be used
	 * to control the maximum virtual memory of the reduce processes.
	 * 
	 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to
	 * pass process environment variables to the reduce processes.
	 */
	public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = JobContext.REDUCE_JAVA_OPTS;

	public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";

	/**
	 * Configuration key to set the maximum virutal memory available to the
	 * child map and reduce tasks (in kilo-bytes).
	 * 
	 * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
	 * via {@link #MAPRED_TASK_JAVA_OPTS}, else the VM might not start.
	 * 
	 * @deprecated Use {@link #MAPRED_MAP_TASK_ULIMIT} or
	 *             {@link #MAPRED_REDUCE_TASK_ULIMIT}
	 */
	@Deprecated
	public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";

	/**
	 * Configuration key to set the maximum virutal memory available to the map
	 * tasks (in kilo-bytes).
	 * 
	 * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
	 * via {@link #MAPRED_MAP_TASK_JAVA_OPTS}, else the VM might not start.
	 */
	public static final String MAPRED_MAP_TASK_ULIMIT = JobContext.MAP_ULIMIT;

	/**
	 * Configuration key to set the maximum virutal memory available to the
	 * reduce tasks (in kilo-bytes).
	 * 
	 * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
	 * via {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}, else the VM might not start.
	 */
	public static final String MAPRED_REDUCE_TASK_ULIMIT = JobContext.REDUCE_ULIMIT;

	/**
	 * Configuration key to set the environment of the child map/reduce tasks.
	 * 
	 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
	 * reference existing environment variables via <code>$key</code>.
	 * 
	 * Example:
	 * <ul>
	 * <li>A=foo - This will set the env variable A to foo.</li>
	 * <li>B=$X:c This is inherit tasktracker's X env variable.</li>
	 * </ul>
	 * 
	 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or
	 *             {@link #MAPRED_REDUCE_TASK_ENV}
	 */
	@Deprecated
	public static final String MAPRED_TASK_ENV = "mapred.child.env";

	/**
	 * Configuration key to set the maximum virutal memory available to the map
	 * tasks.
	 * 
	 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
	 * reference existing environment variables via <code>$key</code>.
	 * 
	 * Example:
	 * <ul>
	 * <li>A=foo - This will set the env variable A to foo.</li>
	 * <li>B=$X:c This is inherit tasktracker's X env variable.</li>
	 * </ul>
	 */
	public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;

	/**
	 * Configuration key to set the maximum virutal memory available to the
	 * reduce tasks.
	 * 
	 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
	 * reference existing environment variables via <code>$key</code>.
	 * 
	 * Example:
	 * <ul>
	 * <li>A=foo - This will set the env variable A to foo.</li>
	 * <li>B=$X:c This is inherit tasktracker's X env variable.</li>
	 * </ul>
	 */
	public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;

	/**
	 * Configuration key to set the logging {@link Level} for the map task.
	 * 
	 * The allowed logging levels are: OFF, FATAL, ERROR, WARN, INFO, DEBUG,
	 * TRACE and ALL.
	 */
	public static final String MAPRED_MAP_TASK_LOG_LEVEL = JobContext.MAP_LOG_LEVEL;

	/**
	 * Configuration key to set the logging {@link Level} for the reduce task.
	 * 
	 * The allowed logging levels are: OFF, FATAL, ERROR, WARN, INFO, DEBUG,
	 * TRACE and ALL.
	 */
	public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = JobContext.REDUCE_LOG_LEVEL;

	/**
	 * Default logging level for map/reduce tasks.
	 */
	public static final Level DEFAULT_LOG_LEVEL = Level.INFO;

	static String deprecatedString(String key) {
		return "The variable " + key + " is no longer used.";
	}

	/**
	 * Find a jar that contains a class of the same name, if any. It will return
	 * a jar file, even if that is not the first thing on the class path that
	 * has a class with the same name.
	 * 
	 * @param my_class
	 *            the class to find.
	 * @return a jar file that contains the class, or null.
	 * @throws IOException
	 */
	private static String findContainingJar(Class my_class) {
		ClassLoader loader = my_class.getClassLoader();
		String class_file = my_class.getName().replaceAll("\\.", "/")
				+ ".class";
		try {
			for (Enumeration itr = loader.getResources(class_file); itr
					.hasMoreElements();) {
				URL url = (URL) itr.nextElement();
				if ("jar".equals(url.getProtocol())) {
					String toReturn = url.getPath();
					if (toReturn.startsWith("file:")) {
						toReturn = toReturn.substring("file:".length());
					}
					toReturn = URLDecoder.decode(toReturn, "UTF-8");
					return toReturn.replaceAll("!.*$", "");
				}
			}
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		return null;
	}

	/**
	 * Normalize the negative values in configuration
	 * 
	 * @param val
	 * @return normalized value
	 */
	public static long normalizeMemoryConfigValue(long val) {
		if (val < 0) {
			val = DISABLED_MEMORY_LIMIT;
		}
		return val;
	}

	/**
	 * Construct a map/reduce job configuration.
	 */
	public JobConf() {
		checkAndWarnDeprecation();
	}

	/**
	 * A new map/reduce configuration where the behavior of reading from the
	 * default resources can be turned off.
	 * <p/>
	 * If the parameter {@code loadDefaults} is false, the new instance will not
	 * load resources from the default files.
	 * 
	 * @param loadDefaults
	 *            specifies whether to load from the default files
	 */
	public JobConf(boolean loadDefaults) {
		super(loadDefaults);
		checkAndWarnDeprecation();
	}

	/**
	 * Construct a map/reduce job configuration.
	 * 
	 * @param exampleClass
	 *            a class whose containing jar is used as the job's jar.
	 */
	public JobConf(Class exampleClass) {
		setJarByClass(exampleClass);
		checkAndWarnDeprecation();
	}

	/**
	 * Construct a map/reduce job configuration.
	 * 
	 * @param conf
	 *            a Configuration whose settings will be inherited.
	 */
	public JobConf(Configuration conf) {
		super(conf);
		checkAndWarnDeprecation();
	}

	/**
	 * Construct a map/reduce job configuration.
	 * 
	 * @param conf
	 *            a Configuration whose settings will be inherited.
	 * @param exampleClass
	 *            a class whose containing jar is used as the job's jar.
	 */
	public JobConf(Configuration conf, Class exampleClass) {
		this(conf);
		setJarByClass(exampleClass);
	}

	/**
	 * Construct a map/reduce configuration.
	 * 
	 * @param config
	 *            a Configuration-format XML job description file.
	 */
	public JobConf(Path config) {
		super();
		addResource(config);
		checkAndWarnDeprecation();
	}

	/**
	 * Construct a map/reduce configuration.
	 * 
	 * @param config
	 *            a Configuration-format XML job description file.
	 */
	public JobConf(String config) {
		this(new Path(config));
	}

	private void checkAndWarnDeprecation() {
		if (get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
			LOG.warn(JobConf
					.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
					+ " Instead use "
					+ JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY
					+ " and "
					+ JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
		}
	}

	/**
	 * Compute the number of slots required to run a single map task-attempt of
	 * this job.
	 * 
	 * @param slotSizePerMap
	 *            cluster-wide value of the amount of memory required to run a
	 *            map-task
	 * @return the number of slots required to run a single map task-attempt 1
	 *         if memory parameters are disabled.
	 */
	int computeNumSlotsPerMap(long slotSizePerMap) {
		if ((slotSizePerMap == DISABLED_MEMORY_LIMIT)
				|| (getMemoryForMapTask() == DISABLED_MEMORY_LIMIT)) {
			return 1;
		}
		return (int) (Math.ceil((float) getMemoryForMapTask()
				/ (float) slotSizePerMap));
	}

	/**
	 * Compute the number of slots required to run a single reduce task-attempt
	 * of this job.
	 * 
	 * @param slotSizePerReduce
	 *            cluster-wide value of the amount of memory required to run a
	 *            reduce-task
	 * @return the number of slots required to run a single reduce task-attempt
	 *         1 if memory parameters are disabled
	 */
	int computeNumSlotsPerReduce(long slotSizePerReduce) {
		if ((slotSizePerReduce == DISABLED_MEMORY_LIMIT)
				|| (getMemoryForReduceTask() == DISABLED_MEMORY_LIMIT)) {
			return 1;
		}
		return (int) (Math.ceil((float) getMemoryForReduceTask()
				/ (float) slotSizePerReduce));
	}

	/**
	 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
	 * 
	 * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes()
	 */
	@Deprecated
	public void deleteLocalFiles() throws IOException {
		String[] localDirs = getLocalDirs();
		for (int i = 0; i < localDirs.length; i++) {
			FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
		}
	}

	public void deleteLocalFiles(String subdir) throws IOException {
		String[] localDirs = getLocalDirs();
		for (int i = 0; i < localDirs.length; i++) {
			FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir),
					true);
		}
	}

	/**
	 * Get the user-defined <i>combiner</i> class used to combine map-outputs
	 * before being sent to the reducers. Typically the combiner is same as the
	 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
	 * 
	 * @return the user-defined combiner class used to combine map-outputs.
	 */
	public Class<? extends Reducer> getCombinerClass() {
		return getClass("mapred.combiner.class", null, Reducer.class);
	}

	/**
	 * Are the outputs of the maps be compressed?
	 * 
	 * @return <code>true</code> if the outputs of the maps are to be
	 *         compressed, <code>false</code> otherwise.
	 */
	public boolean getCompressMapOutput() {
		return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
	}

	// Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
	// converted into MBs.
	// Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
	// value.
	private long getDeprecatedMemoryValue() {
		long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
				DISABLED_MEMORY_LIMIT);
		oldValue = normalizeMemoryConfigValue(oldValue);
		if (oldValue != DISABLED_MEMORY_LIMIT) {
			oldValue /= (1024 * 1024);
		}
		return oldValue;
	}

	/**
	 * Get the {@link InputFormat} implementation for the map-reduce job,
	 * defaults to {@link TextInputFormat} if not specified explicity.
	 * 
	 * @return the {@link InputFormat} implementation for the map-reduce job.
	 */
	public InputFormat getInputFormat() {
		return ReflectionUtils.newInstance(
				getClass("mapred.input.format.class", TextInputFormat.class,
						InputFormat.class), this);
	}

	/**
	 * Get the user jar for the map-reduce job.
	 * 
	 * @return the user jar for the map-reduce job.
	 */
	public String getJar() {
		return get(JobContext.JAR);
	}

	/**
	 * Get the pattern for jar contents to unpack on the tasktracker
	 */
	public Pattern getJarUnpackPattern() {
		return getPattern(JobContext.JAR_UNPACK_PATTERN,
				UNPACK_JAR_PATTERN_DEFAULT);
	}

	/**
	 * Get the uri to be invoked in-order to send a notification after the job
	 * has completed (success/failure).
	 * 
	 * @return the job end notification uri, <code>null</code> if it hasn't been
	 *         set.
	 * @see #setJobEndNotificationURI(String)
	 */
	public String getJobEndNotificationURI() {
		return get(JobContext.END_NOTIFICATION_URL);
	}

	/**
	 * Get job-specific shared directory for use as scratch space
	 * 
	 * <p>
	 * When a job starts, a shared directory is created at location <code>
	 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>
	 * . This directory is exposed to the users through
	 * <code>mapreduce.job.local.dir </code>. So, the tasks can use this space
	 * as scratch space and share files among them.
	 * </p>
	 * This value is available as System property also.
	 * 
	 * @return The localized job specific shared directory
	 */
	public String getJobLocalDir() {
		return get(JobContext.JOB_LOCAL_DIR);
	}

	/**
	 * Get the user-specified job name. This is only used to identify the job to
	 * the user.
	 * 
	 * @return the job's name, defaulting to "".
	 */
	public String getJobName() {
		return get(JobContext.JOB_NAME, "");
	}

	/**
	 * Get the {@link JobPriority} for this job.
	 * 
	 * @return the {@link JobPriority} for this job.
	 */
	public JobPriority getJobPriority() {
		String prio = get(JobContext.PRIORITY);
		if (prio == null) {
			return JobPriority.NORMAL;
		}

		return JobPriority.valueOf(prio);
	}

	/**
	 * Should the temporary files for failed tasks be kept?
	 * 
	 * @return should the files be kept?
	 */
	public boolean getKeepFailedTaskFiles() {
		return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
	}

	/**
	 * Get the regular expression that is matched against the task names to see
	 * if we need to keep the files.
	 * 
	 * @return the pattern as a string, if it was set, othewise null.
	 */
	public String getKeepTaskFilesPattern() {
		return get(JobContext.PRESERVE_FILES_PATTERN);
	}

	/**
	 * Get the {@link KeyFieldBasedComparator} options
	 */
	public String getKeyFieldComparatorOption() {
		return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
	}

	/**
	 * Get the {@link KeyFieldBasedPartitioner} options
	 */
	public String getKeyFieldPartitionerOption() {
		return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
	}

	public String[] getLocalDirs() throws IOException {
		return getTrimmedStrings(MRConfig.LOCAL_DIR);
	}

	/**
	 * Constructs a local file name. Files are distributed among configured
	 * local directories.
	 */
	public Path getLocalPath(String pathString) throws IOException {
		return getLocalPath(MRConfig.LOCAL_DIR, pathString);
	}

	/**
	 * Get the map task's debug script.
	 * 
	 * @return the debug Script for the mapred job for failed map tasks.
	 * @see #setMapDebugScript(String)
	 */
	public String getMapDebugScript() {
		return get(JobContext.MAP_DEBUG_SCRIPT);
	}

	/**
	 * Get the {@link CompressionCodec} for compressing the map outputs.
	 * 
	 * @param defaultValue
	 *            the {@link CompressionCodec} to return if not set
	 * @return the {@link CompressionCodec} class that should be used to
	 *         compress the map outputs.
	 * @throws IllegalArgumentException
	 *             if the class was specified, but not found
	 */
	public Class<? extends CompressionCodec> getMapOutputCompressorClass(
			Class<? extends CompressionCodec> defaultValue) {
		Class<? extends CompressionCodec> codecClass = defaultValue;
		String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
		if (name != null) {
			try {
				codecClass = getClassByName(name).asSubclass(
						CompressionCodec.class);
			} catch (ClassNotFoundException e) {
				throw new IllegalArgumentException("Compression codec " + name
						+ " was not found.", e);
			}
		}
		return codecClass;
	}

	/**
	 * Get the key class for the map output data. If it is not set, use the
	 * (final) output key class. This allows the map output key class to be
	 * different than the final output key class.
	 * 
	 * @return the map output key class.
	 */
	public Class<?> getMapOutputKeyClass() {
		Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null,
				Object.class);
		if (retv == null) {
			retv = getOutputKeyClass();
		}
		return retv;
	}

	/**
	 * Get the value class for the map output data. If it is not set, use the
	 * (final) output value class This allows the map output value class to be
	 * different than the final output value class.
	 * 
	 * @return the map output value class.
	 */
	public Class<?> getMapOutputValueClass() {
		Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
				Object.class);
		if (retv == null) {
			retv = getOutputValueClass();
		}
		return retv;
	}

	/**
	 * Get the {@link Mapper} class for the job.
	 * 
	 * @return the {@link Mapper} class for the job.
	 */
	public Class<? extends Mapper> getMapperClass() {
		return getClass("mapred.mapper.class", IdentityMapper.class,
				Mapper.class);
	}

	/**
	 * Get the {@link MapRunnable} class for the job.
	 * 
	 * @return the {@link MapRunnable} class for the job.
	 */
	public Class<? extends MapRunnable> getMapRunnerClass() {
		return getClass("mapred.map.runner.class", MapRunner.class,
				MapRunnable.class);
	}

	/**
	 * Should speculative execution be used for this job for map tasks? Defaults
	 * to <code>true</code>.
	 * 
	 * @return <code>true</code> if speculative execution be used for this job
	 *         for map tasks, <code>false</code> otherwise.
	 */
	public boolean getMapSpeculativeExecution() {
		return getBoolean(JobContext.MAP_SPECULATIVE, true);
	}

	/**
	 * Get the configured number of maximum attempts that will be made to run a
	 * map task, as specified by the <code>mapreduce.map.maxattempts</code>
	 * property. If this property is not already set, the default is 4 attempts.
	 * 
	 * @return the max number of attempts per map task.
	 */
	public int getMaxMapAttempts() {
		return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
	}

	/**
	 * Get the maximum percentage of map tasks that can fail without the job
	 * being aborted.
	 * 
	 * Each map task is executed a minimum of {@link #getMaxMapAttempts()}
	 * attempts before being declared as <i>failed</i>.
	 * 
	 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
	 * the job being declared as {@link JobStatus#FAILED}.
	 * 
	 * @return the maximum percentage of map tasks that can fail without the job
	 *         being aborted.
	 */
	public int getMaxMapTaskFailuresPercent() {
		return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
	}

	/**
	 * @deprecated this variable is deprecated and nolonger in use.
	 */
	@Deprecated
	public long getMaxPhysicalMemoryForTask() {
		LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
				+ " Refer to the APIs getMemoryForMapTask() and"
				+ " getMemoryForReduceTask() for details.");
		return -1;
	}

	/**
	 * Get the configured number of maximum attempts that will be made to run a
	 * reduce task, as specified by the
	 * <code>mapreduce.reduce.maxattempts</code> property. If this property is
	 * not already set, the default is 4 attempts.
	 * 
	 * @return the max number of attempts per reduce task.
	 */
	public int getMaxReduceAttempts() {
		return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
	}

	/**
	 * Get the maximum percentage of reduce tasks that can fail without the job
	 * being aborted.
	 * 
	 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
	 * attempts before being declared as <i>failed</i>.
	 * 
	 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results
	 * in the job being declared as {@link JobStatus#FAILED}.
	 * 
	 * @return the maximum percentage of reduce tasks that can fail without the
	 *         job being aborted.
	 */
	public int getMaxReduceTaskFailuresPercent() {
		return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
	}

	/**
	 * Expert: Get the maximum no. of failures of a given job per tasktracker.
	 * If the no. of task failures exceeds this, the tasktracker is
	 * <i>blacklisted</i> for this job.
	 * 
	 * @return the maximum no. of failures of a given job per tasktracker.
	 */
	public int getMaxTaskFailuresPerTracker() {
		return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 4);
	}

	/**
	 * Get the memory required to run a task of this job, in bytes. See
	 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
	 * <p/>
	 * This method is deprecated. Now, different memory limits can be set for
	 * map and reduce tasks of a job, in MB.
	 * <p/>
	 * For backward compatibility, if the job configuration sets the key
	 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different from
	 * {@link #DISABLED_MEMORY_LIMIT}, that value is returned. Otherwise, this
	 * method will return the larger of the values returned by
	 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
	 * after converting them into bytes.
	 * 
	 * @return Memory required to run a task of this job, in bytes, or
	 *         {@link #DISABLED_MEMORY_LIMIT}, if unset.
	 * @see #setMaxVirtualMemoryForTask(long)
	 * @deprecated Use {@link #getMemoryForMapTask()} and
	 *             {@link #getMemoryForReduceTask()}
	 */
	@Deprecated
	public long getMaxVirtualMemoryForTask() {
		LOG.warn("getMaxVirtualMemoryForTask() is deprecated. "
				+ "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");

		long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
				DISABLED_MEMORY_LIMIT);
		value = normalizeMemoryConfigValue(value);
		if (value == DISABLED_MEMORY_LIMIT) {
			value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
			value = normalizeMemoryConfigValue(value);
			if (value != DISABLED_MEMORY_LIMIT) {
				value *= 1024 * 1024;
			}
		}
		return value;
	}

	/**
	 * Get memory required to run a map task of the job, in MB.
	 * 
	 * If a value is specified in the configuration, it is returned. Else, it
	 * returns {@link #DISABLED_MEMORY_LIMIT}.
	 * <p/>
	 * For backward compatibility, if the job configuration sets the key
	 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different from
	 * {@link #DISABLED_MEMORY_LIMIT}, that value will be used after converting
	 * it from bytes to MB.
	 * 
	 * @return memory required to run a map task of the job, in MB, or
	 *         {@link #DISABLED_MEMORY_LIMIT} if unset.
	 */
	public long getMemoryForMapTask() {
		long value = getDeprecatedMemoryValue();
		if (value == DISABLED_MEMORY_LIMIT) {
			value = normalizeMemoryConfigValue(getLong(
					JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
					DISABLED_MEMORY_LIMIT));
		}
		return value;
	}

	/**
	 * Get memory required to run a reduce task of the job, in MB.
	 * 
	 * If a value is specified in the configuration, it is returned. Else, it
	 * returns {@link #DISABLED_MEMORY_LIMIT}.
	 * <p/>
	 * For backward compatibility, if the job configuration sets the key
	 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different from
	 * {@link #DISABLED_MEMORY_LIMIT}, that value will be used after converting
	 * it from bytes to MB.
	 * 
	 * @return memory required to run a reduce task of the job, in MB, or
	 *         {@link #DISABLED_MEMORY_LIMIT} if unset.
	 */
	public long getMemoryForReduceTask() {
		long value = getDeprecatedMemoryValue();
		if (value == DISABLED_MEMORY_LIMIT) {
			value = normalizeMemoryConfigValue(getLong(
					JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
					DISABLED_MEMORY_LIMIT));
		}
		return value;
	}

	/**
	 * Get configured the number of reduce tasks for this job. Defaults to
	 * <code>1</code>.
	 * 
	 * @return the number of reduce tasks for this job.
	 */
	public int getNumMapTasks() {
		return getInt(JobContext.NUM_MAPS, 1);
	}

	/**
	 * Get configured the number of reduce tasks for this job. Defaults to
	 * <code>1</code>.
	 * 
	 * @return the number of reduce tasks for this job.
	 */
	public int getNumReduceTasks() {
		return getInt(JobContext.NUM_REDUCES, 1);
	}

	/**
	 * Get the number of tasks that a spawned JVM should execute
	 */
	public int getNumTasksToExecutePerJvm() {
		return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
	}

	/**
	 * Get the {@link OutputCommitter} implementation for the map-reduce job,
	 * defaults to {@link FileOutputCommitter} if not specified explicitly.
	 * 
	 * @return the {@link OutputCommitter} implementation for the map-reduce
	 *         job.
	 */
	public OutputCommitter getOutputCommitter() {
		return (OutputCommitter) ReflectionUtils
				.newInstance(
						getClass("mapred.output.committer.class",
								FileOutputCommitter.class,
								OutputCommitter.class), this);
	}

	/**
	 * Get the {@link OutputFormat} implementation for the map-reduce job,
	 * defaults to {@link TextOutputFormat} if not specified explicity.
	 * 
	 * @return the {@link OutputFormat} implementation for the map-reduce job.
	 */
	public OutputFormat getOutputFormat() {
		return ReflectionUtils.newInstance(
				getClass("mapred.output.format.class", TextOutputFormat.class,
						OutputFormat.class), this);
	}

	/**
	 * Get the key class for the job output data.
	 * 
	 * @return the key class for the job output data.
	 */
	public Class<?> getOutputKeyClass() {
		return getClass(JobContext.OUTPUT_KEY_CLASS, LongWritable.class,
				Object.class);
	}

	/**
	 * Get the {@link RawComparator} comparator used to compare keys.
	 * 
	 * @return the {@link RawComparator} comparator used to compare keys.
	 */
	public RawComparator getOutputKeyComparator() {
		Class<? extends RawComparator> theClass = getClass(
				JobContext.KEY_COMPARATOR, null, RawComparator.class);
		if (theClass != null)
			return ReflectionUtils.newInstance(theClass, this);
		return WritableComparator.get(getMapOutputKeyClass().asSubclass(
				WritableComparable.class));
	}

	/**
	 * Get the value class for job outputs.
	 * 
	 * @return the value class for job outputs.
	 */
	public Class<?> getOutputValueClass() {
		return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
	}

	/**
	 * Get the user defined {@link WritableComparable} comparator for grouping
	 * keys of inputs to the reduce.
	 * 
	 * @return comparator set by the user for grouping values.
	 * @see #setOutputValueGroupingComparator(Class) for details.
	 */
	public RawComparator getOutputValueGroupingComparator() {
		Class<? extends RawComparator> theClass = getClass(
				JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
		if (theClass == null) {
			return getOutputKeyComparator();
		}

		return ReflectionUtils.newInstance(theClass, this);
	}

	/**
	 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs to
	 * be sent to the {@link Reducer}s.
	 * 
	 * @return the {@link Partitioner} used to partition map-outputs.
	 */
	public Class<? extends Partitioner> getPartitionerClass() {
		return getClass("mapred.partitioner.class", HashPartitioner.class,
				Partitioner.class);
	}

	/**
	 * Get whether the task profiling is enabled.
	 * 
	 * @return true if some tasks will be profiled
	 */
	public boolean getProfileEnabled() {
		return getBoolean(JobContext.TASK_PROFILE, false);
	}

	/**
	 * Get the profiler configuration arguments.
	 * 
	 * The default value for this property is
	 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
	 * 
	 * @return the parameters to pass to the task child to configure profiling
	 */
	public String getProfileParams() {
		return get(JobContext.TASK_PROFILE_PARAMS,
				"-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,"
						+ "verbose=n,file=%s");
	}

	/**
	 * Get the range of maps or reduces to profile.
	 * 
	 * @param isMap
	 *            is the task a map?
	 * @return the task ranges
	 */
	public IntegerRanges getProfileTaskRange(boolean isMap) {
		return getRange((isMap ? JobContext.NUM_MAP_PROFILES
				: JobContext.NUM_REDUCE_PROFILES), "0-2");
	}

	/**
	 * Return the name of the queue to which this job is submitted. Defaults to
	 * 'default'.
	 * 
	 * @return name of the queue
	 */
	public String getQueueName() {
		return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
	}

	/**
	 * Get the reduce task's debug Script
	 * 
	 * @return the debug script for the mapred job for failed reduce tasks.
	 * @see #setReduceDebugScript(String)
	 */
	public String getReduceDebugScript() {
		return get(JobContext.REDUCE_DEBUG_SCRIPT);
	}

	/**
	 * Get the {@link Reducer} class for the job.
	 * 
	 * @return the {@link Reducer} class for the job.
	 */
	public Class<? extends Reducer> getReducerClass() {
		return getClass("mapred.reducer.class", IdentityReducer.class,
				Reducer.class);
	}

	/**
	 * Should speculative execution be used for this job for reduce tasks?
	 * Defaults to <code>true</code>.
	 * 
	 * @return <code>true</code> if speculative execution be used for reduce
	 *         tasks for this job, <code>false</code> otherwise.
	 */
	public boolean getReduceSpeculativeExecution() {
		return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
	}

	/**
	 * Get the user-specified session identifier. The default is the empty
	 * string.
	 * 
	 * The session identifier is used to tag metric data that is reported to
	 * some performance metrics system via the org.apache.hadoop.metrics API.
	 * The session identifier is intended, in particular, for use by
	 * Hadoop-On-Demand (HOD) which allocates a virtual Hadoop cluster
	 * dynamically and transiently. HOD will set the session identifier by
	 * modifying the mapred-site.xml file before starting the cluster.
	 * 
	 * When not running under HOD, this identifer is expected to remain set to
	 * the empty string.
	 * 
	 * @return the session identifier, defaulting to "".
	 */
	@Deprecated
	public String getSessionId() {
		return get("session.id", "");
	}

	/**
	 * Should speculative execution be used for this job? Defaults to
	 * <code>true</code>.
	 * 
	 * @return <code>true</code> if speculative execution be used for this job,
	 *         <code>false</code> otherwise.
	 */
	public boolean getSpeculativeExecution() {
		return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
	}

	/**
	 * Should the framework use the new context-object code for running the
	 * mapper?
	 * 
	 * @return true, if the new api should be used
	 */
	public boolean getUseNewMapper() {
		return getBoolean("mapred.mapper.new-api", false);
	}

	/**
	 * Should the framework use the new context-object code for running the
	 * reducer?
	 * 
	 * @return true, if the new api should be used
	 */
	public boolean getUseNewReducer() {
		return getBoolean("mapred.reducer.new-api", false);
	}

	/**
	 * Get the reported username for this job.
	 * 
	 * @return the username
	 */
	public String getUser() {
		return get(JobContext.USER_NAME);
	}

	/**
	 * Get the current working directory for the default file system.
	 * 
	 * @return the directory name.
	 */
	public Path getWorkingDirectory() {
		String name = get(JobContext.WORKING_DIR);
		if (name != null) {
			return new Path(name);
		} else {
			try {
				Path dir = FileSystem.get(this).getWorkingDirectory();
				set(JobContext.WORKING_DIR, dir.toString());
				return dir;
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
		}
	}

	/**
	 * Set the user-defined <i>combiner</i> class used to combine map-outputs
	 * before being sent to the reducers.
	 * 
	 * <p>
	 * The combiner is an application-specified aggregation operation, which can
	 * help cut down the amount of data transferred between the {@link Mapper}
	 * and the {@link Reducer}, leading to better performance.
	 * </p>
	 * 
	 * <p>
	 * The framework may invoke the combiner 0, 1, or multiple times, in both
	 * the mapper and reducer tasks. In general, the combiner is called as the
	 * sort/merge result is written to disk. The combiner must:
	 * <ul>
	 * <li>be side-effect free</li>
	 * <li>have the same input and output key types and the same input and
	 * output value types</li>
	 * </ul>
	 * </p>
	 * 
	 * <p>
	 * Typically the combiner is same as the <code>Reducer</code> for the job
	 * i.e. {@link #setReducerClass(Class)}.
	 * </p>
	 * 
	 * @param theClass
	 *            the user-defined combiner class used to combine map-outputs.
	 */
	public void setCombinerClass(Class<? extends Reducer> theClass) {
		setClass("mapred.combiner.class", theClass, Reducer.class);
	}

	/**
	 * Should the map outputs be compressed before transfer? Uses the
	 * SequenceFile compression.
	 * 
	 * @param compress
	 *            should the map outputs be compressed?
	 */
	public void setCompressMapOutput(boolean compress) {
		setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
	}

	/**
	 * Set the {@link InputFormat} implementation for the map-reduce job.
	 * 
	 * @param theClass
	 *            the {@link InputFormat} implementation for the map-reduce job.
	 */
	public void setInputFormat(Class<? extends InputFormat> theClass) {
		setClass("mapred.input.format.class", theClass, InputFormat.class);
	}

	/**
	 * Set the user jar for the map-reduce job.
	 * 
	 * @param jar
	 *            the user jar for the map-reduce job.
	 */
	public void setJar(String jar) {
		set(JobContext.JAR, jar);
	}

	/**
	 * Set the job's jar file by finding an example class location.
	 * 
	 * @param cls
	 *            the example class.
	 */
	public void setJarByClass(Class cls) {
		String jar = findContainingJar(cls);
		if (jar != null) {
			setJar(jar);
		}
	}

	/**
	 * Set the uri to be invoked in-order to send a notification after the job
	 * has completed (success/failure).
	 * 
	 * <p>
	 * The uri can contain 2 special parameters: <tt>$jobId</tt> and
	 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's
	 * identifier and completion-status respectively.
	 * </p>
	 * 
	 * <p>
	 * This is typically used by application-writers to implement chaining of
	 * Map-Reduce jobs in an <i>asynchronous manner</i>.
	 * </p>
	 * 
	 * @param uri
	 *            the job end notification uri
	 * @see JobStatus
	 * @see <a href="{@docRoot} /org/apache/hadoop/mapred/JobClient.html#
	 *      JobCompletionAndChaining">Job Completion and Chaining</a>
	 */
	public void setJobEndNotificationURI(String uri) {
		set(JobContext.END_NOTIFICATION_URL, uri);
	}

	/**
	 * Set the user-specified job name.
	 * 
	 * @param name
	 *            the job's new name.
	 */
	public void setJobName(String name) {
		set(JobContext.JOB_NAME, name);
	}

	/**
	 * Set {@link JobPriority} for this job.
	 * 
	 * @param prio
	 *            the {@link JobPriority} for this job.
	 */
	public void setJobPriority(JobPriority prio) {
		set(JobContext.PRIORITY, prio.toString());
	}

	/**
	 * Set whether the framework should keep the intermediate files for failed
	 * tasks.
	 * 
	 * @param keep
	 *            <code>true</code> if framework should keep the intermediate
	 *            files for failed tasks, <code>false</code> otherwise.
	 * 
	 */
	public void setKeepFailedTaskFiles(boolean keep) {
		setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
	}

	/**
	 * Set a regular expression for task names that should be kept. The regular
	 * expression ".*_m_000123_0" would keep the files for the first instance of
	 * map 123 that ran.
	 * 
	 * @param pattern
	 *            the java.util.regex.Pattern to match against the task names.
	 */
	public void setKeepTaskFilesPattern(String pattern) {
		set(JobContext.PRESERVE_FILES_PATTERN, pattern);
	}

	/**
	 * Set the {@link KeyFieldBasedComparator} options used to compare keys.
	 * 
	 * @param keySpec
	 *            the key specification of the form -k pos1[,pos2], where, pos
	 *            is of the form f[.c][opts], where f is the number of the key
	 *            field to use, and c is the number of the first character from
	 *            the beginning of the field. Fields and character posns are
	 *            numbered starting with 1; a character position of zero in pos2
	 *            indicates the field's last character. If '.c' is omitted from
	 *            pos1, it defaults to 1 (the beginning of the field); if
	 *            omitted from pos2, it defaults to 0 (the end of the field).
	 *            opts are ordering options. The supported options are: -n,
	 *            (Sort numerically) -r, (Reverse the result of comparison)
	 */
	public void setKeyFieldComparatorOptions(String keySpec) {
		setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
		set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
	}

	/**
	 * Set the {@link KeyFieldBasedPartitioner} options used for
	 * {@link Partitioner}
	 * 
	 * @param keySpec
	 *            the key specification of the form -k pos1[,pos2], where, pos
	 *            is of the form f[.c][opts], where f is the number of the key
	 *            field to use, and c is the number of the first character from
	 *            the beginning of the field. Fields and character posns are
	 *            numbered starting with 1; a character position of zero in pos2
	 *            indicates the field's last character. If '.c' is omitted from
	 *            pos1, it defaults to 1 (the beginning of the field); if
	 *            omitted from pos2, it defaults to 0 (the end of the field).
	 */
	public void setKeyFieldPartitionerOptions(String keySpec) {
		setPartitionerClass(KeyFieldBasedPartitioner.class);
		set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
	}

	/**
	 * Set the debug script to run when the map tasks fail.
	 * 
	 * <p>
	 * The debug script can aid debugging of failed map tasks. The script is
	 * given task's stdout, stderr, syslog, jobconf files as arguments.
	 * </p>
	 * 
	 * <p>
	 * The debug command, run on the node where the map failed, is:
	 * </p>
	 * <p>
	 * 
	 * <pre>
	 * <blockquote> 
	 * $script $stdout $stderr $syslog $jobconf.
	 * </blockquote>
	 * </pre>
	 * 
	 * </p>
	 * 
	 * <p>
	 * The script file is distributed through {@link DistributedCache} APIs. The
	 * script needs to be symlinked.
	 * </p>
	 * 
	 * <p>
	 * Here is an example on how to submit a script
	 * <p>
	 * <blockquote>
	 * 
	 * <pre>
	 * job.setMapDebugScript(&quot;./myscript&quot;);
	 * DistributedCache.createSymlink(job);
	 * DistributedCache.addCacheFile(&quot;/debug/scripts/myscript#myscript&quot;);
	 * </pre>
	 * 
	 * </blockquote>
	 * </p>
	 * 
	 * @param mDbgScript
	 *            the script name
	 */
	public void setMapDebugScript(String mDbgScript) {
		set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
	}

	/**
	 * Set the given class as the {@link CompressionCodec} for the map outputs.
	 * 
	 * @param codecClass
	 *            the {@link CompressionCodec} class that will compress the map
	 *            outputs.
	 */
	public void setMapOutputCompressorClass(
			Class<? extends CompressionCodec> codecClass) {
		setCompressMapOutput(true);
		setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass,
				CompressionCodec.class);
	}

	/**
	 * Set the key class for the map output data. This allows the user to
	 * specify the map output key class to be different than the final output
	 * value class.
	 * 
	 * @param theClass
	 *            the map output key class.
	 */
	public void setMapOutputKeyClass(Class<?> theClass) {
		setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
	}

	/**
	 * Set the value class for the map output data. This allows the user to
	 * specify the map output value class to be different than the final output
	 * value class.
	 * 
	 * @param theClass
	 *            the map output value class.
	 */
	public void setMapOutputValueClass(Class<?> theClass) {
		setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
	}

	/**
	 * Set the {@link Mapper} class for the job.
	 * 
	 * @param theClass
	 *            the {@link Mapper} class for the job.
	 */
	public void setMapperClass(Class<? extends Mapper> theClass) {
		setClass("mapred.mapper.class", theClass, Mapper.class);
	}

	/**
	 * Expert: Set the {@link MapRunnable} class for the job.
	 * 
	 * Typically used to exert greater control on {@link Mapper}s.
	 * 
	 * @param theClass
	 *            the {@link MapRunnable} class for the job.
	 */
	public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
		setClass("mapred.map.runner.class", theClass, MapRunnable.class);
	}

	/**
	 * Turn speculative execution on or off for this job for map tasks.
	 * 
	 * @param speculativeExecution
	 *            <code>true</code> if speculative execution should be turned on
	 *            for map tasks, else <code>false</code>.
	 */
	public void setMapSpeculativeExecution(boolean speculativeExecution) {
		setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
	}

	/**
	 * Expert: Set the number of maximum attempts that will be made to run a map
	 * task.
	 * 
	 * @param n
	 *            the number of attempts per map task.
	 */
	public void setMaxMapAttempts(int n) {
		setInt(JobContext.MAP_MAX_ATTEMPTS, n);
	}

	/**
	 * Expert: Set the maximum percentage of map tasks that can fail without the
	 * job being aborted.
	 * 
	 * Each map task is executed a minimum of {@link #getMaxMapAttempts}
	 * attempts before being declared as <i>failed</i>.
	 * 
	 * @param percent
	 *            the maximum percentage of map tasks that can fail without the
	 *            job being aborted.
	 */
	public void setMaxMapTaskFailuresPercent(int percent) {
		setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
	}

	/*
	 * @deprecated this
	 */
	@Deprecated
	public void setMaxPhysicalMemoryForTask(long mem) {
		LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
				+ " The value set is ignored. Refer to "
				+ " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
	}

	/**
	 * Expert: Set the number of maximum attempts that will be made to run a
	 * reduce task.
	 * 
	 * @param n
	 *            the number of attempts per reduce task.
	 */
	public void setMaxReduceAttempts(int n) {
		setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
	}

	/**
	 * Set the maximum percentage of reduce tasks that can fail without the job
	 * being aborted.
	 * 
	 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
	 * attempts before being declared as <i>failed</i>.
	 * 
	 * @param percent
	 *            the maximum percentage of reduce tasks that can fail without
	 *            the job being aborted.
	 */
	public void setMaxReduceTaskFailuresPercent(int percent) {
		setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
	}

	/**
	 * Set the maximum no. of failures of a given job per tasktracker. If the
	 * no. of task failures exceeds <code>noFailures</code>, the tasktracker is
	 * <i>blacklisted</i> for this job.
	 * 
	 * @param noFailures
	 *            maximum no. of failures of a given job per tasktracker.
	 */
	public void setMaxTaskFailuresPerTracker(int noFailures) {
		setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
	}

	/**
	 * Set the maximum amount of memory any task of this job can use. See
	 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
	 * <p/>
	 * mapred.task.maxvmem is split into mapreduce.map.memory.mb and
	 * mapreduce.map.memory.mb,mapred each of the new key are set as
	 * mapred.task.maxvmem / 1024 as new values are in MB
	 * 
	 * @param vmem
	 *            Maximum amount of virtual memory in bytes any task of this job
	 *            can use.
	 * @see #getMaxVirtualMemoryForTask()
	 * @deprecated Use {@link #setMemoryForMapTask(long mem)} and Use
	 *             {@link #setMemoryForReduceTask(long mem)}
	 */
	@Deprecated
	public void setMaxVirtualMemoryForTask(long vmem) {
		LOG.warn("setMaxVirtualMemoryForTask() is deprecated."
				+ "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
		if (vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
			setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
			setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
		}

		if (get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
			setMemoryForMapTask(vmem / (1024 * 1024)); // Changing bytes to mb
			setMemoryForReduceTask(vmem / (1024 * 1024));// Changing bytes to mb
		} else {
			this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, vmem);
		}
	}

	public void setMemoryForMapTask(long mem) {
		setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
	}

	public void setMemoryForReduceTask(long mem) {
		setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
	}

	/**
	 * Set the number of map tasks for this job.
	 * 
	 * <p>
	 * <i>Note</i>: This is only a <i>hint</i> to the framework. The actual
	 * number of spawned map tasks depends on the number of {@link InputSplit}s
	 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
	 * 
	 * A custom {@link InputFormat} is typically used to accurately control the
	 * number of map tasks for the job.
	 * </p>
	 * 
	 * <h4 id="NoOfMaps">How many maps?</h4>
	 * 
	 * <p>
	 * The number of maps is usually driven by the total size of the inputs i.e.
	 * total number of blocks of the input files.
	 * </p>
	 * 
	 * <p>
	 * The right level of parallelism for maps seems to be around 10-100 maps
	 * per-node, although it has been set up to 300 or so for very cpu-light map
	 * tasks. Task setup takes awhile, so it is best if the maps take at least a
	 * minute to execute.
	 * </p>
	 * 
	 * <p>
	 * The default behavior of file-based {@link InputFormat}s is to split the
	 * input into <i>logical</i> {@link InputSplit}s based on the total size, in
	 * bytes, of input files. However, the {@link FileSystem} blocksize of the 
   * input files is treated as an upper bound for input splits. A lower bound 
   * on the split size can be set via 
   * <a href="{@docRoot} blocksize of the
	 * input files is treated as an upper bound for input splits. A lower bound
	 * on the split size can be set via <a href="{@docRoot} blocksize of the
	 * input files is treated as an upper bound for input splits. A lower bound
	 * on the split size can be set via <a href="{@docRoot} blocksize of the
	 * input files is treated as an upper bound for input splits. A lower bound
	 * on the split size can be set via <a href="{@docRoot} blocksize of the
	 * input files is treated as an upper bound for input splits. A lower bound
	 * on the split size can be set via <a href="{@docRoot} blocksize of the
	 * input files is treated as an upper bound for input splits. A lower bound
	 * on the split size can be set via <a href="{@docRoot} blocksize of the
	 * input files is treated as an upper bound for input splits. A lower bound
	 * on the split size can be set via <a href="{@docRoot} blocksize of the
	 * input files is treated as an upper bound for input splits. A lower bound
	 * on the split size can be set via <a href="{@docRoot}
	 * /../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
	 * mapreduce.input.fileinputformat.split.minsize</a>.
	 * </p>
	 * 
	 * <p>
	 * Thus, if you expect 10TB of input data and have a blocksize of 128MB,
	 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is
	 * used to set it even higher.
	 * </p>
	 * 
	 * @param n
	 *            the number of map tasks for this job.
	 * @see InputFormat#getSplits(JobConf, int)
	 * @see FileInputFormat
	 * @see FileSystem#getDefaultBlockSize()
	 * @see FileStatus#getBlockSize()
	 */
	public void setNumMapTasks(int n) {
		setInt(JobContext.NUM_MAPS, n);
	}

	/**
	 * Set the requisite number of reduce tasks for this job.
	 * 
	 * <h4 id="NoOfReduces">How many reduces?</h4>
	 * 
	 * <p>
	 * The right number of reduces seems to be <code>0.95</code> or
	 * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
   * <a href="{@docRoot} multiplied by (&lt;<i>no. of nodes</i>&gt; * <a
	 * href="{@docRoot} multiplied by (&lt;<i>no. of nodes</i>&gt; * <a
	 * href="{@docRoot} multiplied by (&lt;<i>no. of nodes</i>&gt; * <a
	 * href="{@docRoot} multiplied by (&lt;<i>no. of nodes</i>&gt; * <a
	 * href="{@docRoot} multiplied by (&lt;<i>no. of nodes</i>&gt; * <a
	 * href="{@docRoot} multiplied by (&lt;<i>no. of nodes</i>&gt; * <a
	 * href="{@docRoot} multiplied by (&lt;<i>no. of nodes</i>&gt; * <a
	 * href="{@docRoot}
	 * /../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
	 * mapreduce.tasktracker.reduce.tasks.maximum</a>).
	 * </p>
	 * 
	 * <p>
	 * With <code>0.95</code> all of the reduces can launch immediately and
	 * start transfering map outputs as the maps finish. With <code>1.75</code>
	 * the faster nodes will finish their first round of reduces and launch a
	 * second wave of reduces doing a much better job of load balancing.
	 * </p>
	 * 
	 * <p>
	 * Increasing the number of reduces increases the framework overhead, but
	 * increases load balancing and lowers the cost of failures.
	 * </p>
	 * 
	 * <p>
	 * The scaling factors above are slightly less than whole numbers to reserve
	 * a few reduce slots in the framework for speculative-tasks, failures etc.
	 * </p>
	 * 
	 * <h4 id="ReducerNone">Reducer NONE</h4>
	 * 
	 * <p>
	 * It is legal to set the number of reduce-tasks to <code>zero</code>.
	 * </p>
	 * 
	 * <p>
	 * In this case the output of the map-tasks directly go to distributed
	 * file-system, to the path set by
	 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the
	 * framework doesn't sort the map-outputs before writing it out to HDFS.
	 * </p>
	 * 
	 * @param n
	 *            the number of reduce tasks for this job.
	 */
	public void setNumReduceTasks(int n) {
		setInt(JobContext.NUM_REDUCES, n);
	}

	/**
	 * Sets the number of tasks that a spawned task JVM should run before it
	 * exits
	 * 
	 * @param numTasks
	 *            the number of tasks to execute; defaults to 1; -1 signifies no
	 *            limit
	 */
	public void setNumTasksToExecutePerJvm(int numTasks) {
		setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
	}

	/**
	 * Set the {@link OutputCommitter} implementation for the map-reduce job.
	 * 
	 * @param theClass
	 *            the {@link OutputCommitter} implementation for the map-reduce
	 *            job.
	 */
	public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
		setClass("mapred.output.committer.class", theClass,
				OutputCommitter.class);
	}

	/**
	 * Set the {@link OutputFormat} implementation for the map-reduce job.
	 * 
	 * @param theClass
	 *            the {@link OutputFormat} implementation for the map-reduce
	 *            job.
	 */
	public void setOutputFormat(Class<? extends OutputFormat> theClass) {
		setClass("mapred.output.format.class", theClass, OutputFormat.class);
	}

	/**
	 * Set the key class for the job output data.
	 * 
	 * @param theClass
	 *            the key class for the job output data.
	 */
	public void setOutputKeyClass(Class<?> theClass) {
		setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
	}

	/**
	 * Set the {@link RawComparator} comparator used to compare keys.
	 * 
	 * @param theClass
	 *            the {@link RawComparator} comparator used to compare keys.
	 * @see #setOutputValueGroupingComparator(Class)
	 */
	public void setOutputKeyComparatorClass(
			Class<? extends RawComparator> theClass) {
		setClass(JobContext.KEY_COMPARATOR, theClass, RawComparator.class);
	}

	/**
	 * Set the value class for job outputs.
	 * 
	 * @param theClass
	 *            the value class for job outputs.
	 */
	public void setOutputValueClass(Class<?> theClass) {
		setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
	}

	/**
	 * Set the user defined {@link RawComparator} comparator for grouping keys
	 * in the input to the reduce.
	 * 
	 * <p>
	 * This comparator should be provided if the equivalence rules for keys for
	 * sorting the intermediates are different from those for grouping keys
	 * before each call to
	 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}
	 * .
	 * </p>
	 * 
	 * <p>
	 * For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
	 * in a single call to the reduce function if K1 and K2 compare as equal.
	 * </p>
	 * 
	 * <p>
	 * Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
	 * how keys are sorted, this can be used in conjunction to simulate
	 * <i>secondary sort on values</i>.
	 * </p>
	 * 
	 * <p>
	 * <i>Note</i>: This is not a guarantee of the reduce sort being
	 * <i>stable</i> in any sense. (In any case, with the order of available
	 * map-outputs to the reduce being non-deterministic, it wouldn't make that
	 * much sense.)
	 * </p>
	 * 
	 * @param theClass
	 *            the comparator class to be used for grouping keys. It should
	 *            implement <code>RawComparator</code>.
	 * @see #setOutputKeyComparatorClass(Class)
	 */
	public void setOutputValueGroupingComparator(
			Class<? extends RawComparator> theClass) {
		setClass(JobContext.GROUP_COMPARATOR_CLASS, theClass,
				RawComparator.class);
	}

	/**
	 * Set the {@link Partitioner} class used to partition {@link Mapper}
	 * -outputs to be sent to the {@link Reducer}s.
	 * 
	 * @param theClass
	 *            the {@link Partitioner} used to partition map-outputs.
	 */
	public void setPartitionerClass(Class<? extends Partitioner> theClass) {
		setClass("mapred.partitioner.class", theClass, Partitioner.class);
	}

	/**
	 * Set whether the system should collect profiler information for some of
	 * the tasks in this job? The information is stored in the user log
	 * directory.
	 * 
	 * @param newValue
	 *            true means it should be gathered
	 */
	public void setProfileEnabled(boolean newValue) {
		setBoolean(JobContext.TASK_PROFILE, newValue);
	}

	/**
	 * Set the profiler configuration arguments. If the string contains a '%s'
	 * it will be replaced with the name of the profiling output file when the
	 * task runs.
	 * 
	 * This value is passed to the task child JVM on the command line.
	 * 
	 * @param value
	 *            the configuration string
	 */
	public void setProfileParams(String value) {
		set(JobContext.TASK_PROFILE_PARAMS, value);
	}

	/**
	 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
	 * must also be called.
	 * 
	 * @param newValue
	 *            a set of integer ranges of the map ids
	 */
	public void setProfileTaskRange(boolean isMap, String newValue) {
		// parse the value to make sure it is legal
		new Configuration.IntegerRanges(newValue);
		set((isMap ? JobContext.NUM_MAP_PROFILES
				: JobContext.NUM_REDUCE_PROFILES), newValue);
	}

	/**
	 * Set the name of the queue to which this job should be submitted.
	 * 
	 * @param queueName
	 *            Name of the queue
	 */
	public void setQueueName(String queueName) {
		set(JobContext.QUEUE_NAME, queueName);
	}

	/**
	 * Set the debug script to run when the reduce tasks fail.
	 * 
	 * <p>
	 * The debug script can aid debugging of failed reduce tasks. The script is
	 * given task's stdout, stderr, syslog, jobconf files as arguments.
	 * </p>
	 * 
	 * <p>
	 * The debug command, run on the node where the map failed, is:
	 * </p>
	 * <p>
	 * 
	 * <pre>
	 * <blockquote> 
	 * $script $stdout $stderr $syslog $jobconf.
	 * </blockquote>
	 * </pre>
	 * 
	 * </p>
	 * 
	 * <p>
	 * The script file is distributed through {@link DistributedCache} APIs. The
	 * script file needs to be symlinked
	 * </p>
	 * 
	 * <p>
	 * Here is an example on how to submit a script
	 * <p>
	 * <blockquote>
	 * 
	 * <pre>
	 * job.setReduceDebugScript(&quot;./myscript&quot;);
	 * DistributedCache.createSymlink(job);
	 * DistributedCache.addCacheFile(&quot;/debug/scripts/myscript#myscript&quot;);
	 * </pre>
	 * 
	 * </blockquote>
	 * </p>
	 * 
	 * @param rDbgScript
	 *            the script name
	 */
	public void setReduceDebugScript(String rDbgScript) {
		set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
	}

	/**
	 * Set the {@link Reducer} class for the job.
	 * 
	 * @param theClass
	 *            the {@link Reducer} class for the job.
	 */
	public void setReducerClass(Class<? extends Reducer> theClass) {
		setClass("mapred.reducer.class", theClass, Reducer.class);
	}

	/**
	 * Turn speculative execution on or off for this job for reduce tasks.
	 * 
	 * @param speculativeExecution
	 *            <code>true</code> if speculative execution should be turned on
	 *            for reduce tasks, else <code>false</code>.
	 */
	public void setReduceSpeculativeExecution(boolean speculativeExecution) {
		setBoolean(JobContext.REDUCE_SPECULATIVE, speculativeExecution);
	}

	/**
	 * Set the user-specified session identifier.
	 * 
	 * @param sessionId
	 *            the new session id.
	 */
	@Deprecated
	public void setSessionId(String sessionId) {
		set("session.id", sessionId);
	}

	/**
	 * Turn speculative execution on or off for this job.
	 * 
	 * @param speculativeExecution
	 *            <code>true</code> if speculative execution should be turned
	 *            on, else <code>false</code>.
	 */
	public void setSpeculativeExecution(boolean speculativeExecution) {
		setMapSpeculativeExecution(speculativeExecution);
		setReduceSpeculativeExecution(speculativeExecution);
	}

	/**
	 * Set whether the framework should use the new api for the mapper. This is
	 * the default for jobs submitted with the new Job api.
	 * 
	 * @param flag
	 *            true, if the new api should be used
	 */
	public void setUseNewMapper(boolean flag) {
		setBoolean("mapred.mapper.new-api", flag);
	}

	/**
	 * Set whether the framework should use the new api for the reducer. This is
	 * the default for jobs submitted with the new Job api.
	 * 
	 * @param flag
	 *            true, if the new api should be used
	 */
	public void setUseNewReducer(boolean flag) {
		setBoolean("mapred.reducer.new-api", flag);
	}

	/**
	 * Set the reported username for this job.
	 * 
	 * @param user
	 *            the username for this job.
	 */
	public void setUser(String user) {
		set(JobContext.USER_NAME, user);
	}

	/**
	 * Set the current working directory for the default file system.
	 * 
	 * @param dir
	 *            the new current working directory.
	 */
	public void setWorkingDirectory(Path dir) {
		dir = new Path(getWorkingDirectory(), dir);
		set(JobContext.WORKING_DIR, dir.toString());
	}

}
